---
title: engine
sidebarTitle: engine
---

# `prefect.utilities.engine`

## Functions

### `collect_task_run_inputs` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L66" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
collect_task_run_inputs(expr: Any, max_depth: int = -1) -> set[Union[TaskRunResult, FlowRunResult]]
```


This function recurses through an expression to generate a set of any discernible
task run inputs it finds in the data structure. It produces a set of all inputs
found.

Examples:

    ```python
    task_inputs = {
        k: await collect_task_run_inputs(v) for k, v in parameters.items()
     }
    ```


### `collect_task_run_inputs_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L117" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
collect_task_run_inputs_sync(expr: Any, future_cls: Any = PrefectFuture, max_depth: int = -1) -> set[Union[TaskRunResult, FlowRunResult]]
```


This function recurses through an expression to generate a set of any discernible
task run inputs it finds in the data structure. It produces a set of all inputs
found.

**Examples:**

```python
task_inputs = {
    k: collect_task_run_inputs_sync(v) for k, v in parameters.items()
 }
```


### `capture_sigterm` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L172" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
capture_sigterm() -> Generator[None, Any, None]
```

### `resolve_inputs` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L201" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
resolve_inputs(parameters: dict[str, Any], return_data: bool = True, max_depth: int = -1) -> dict[str, Any]
```


Resolve any `Quote`, `PrefectFuture`, or `State` types nested in parameters into
data.

**Returns:**
- A copy of the parameters with resolved data

**Raises:**
- `UpstreamTaskError`: If any of the upstream states are not `COMPLETED`


### `propose_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L315" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
propose_state(client: 'PrefectClient', state: State[Any], flow_run_id: UUID, force: bool = False) -> State[Any]
```


Propose a new state for a flow run, invoking Prefect orchestration logic.

If the proposed state is accepted, the provided `state` will be augmented with
 details and returned.

If the proposed state is rejected, a new state returned by the Prefect API will be
returned.

If the proposed state results in a WAIT instruction from the Prefect API, the
function will sleep and attempt to propose the state again.

If the proposed state results in an ABORT instruction from the Prefect API, an
error will be raised.

**Args:**
- `state`: a new state for a flow run
- `flow_run_id`: an optional flow run id, used when proposing flow run states

**Returns:**
- a State model representation of the flow run state

**Raises:**
- `prefect.exceptions.Abort`: if an ABORT instruction is received from
the Prefect API


### `propose_state_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L413" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
propose_state_sync(client: 'SyncPrefectClient', state: State[Any], flow_run_id: UUID, force: bool = False) -> State[Any]
```


Propose a new state for a flow run, invoking Prefect orchestration logic.

If the proposed state is accepted, the provided `state` will be augmented with
 details and returned.

If the proposed state is rejected, a new state returned by the Prefect API will be
returned.

If the proposed state results in a WAIT instruction from the Prefect API, the
function will sleep and attempt to propose the state again.

If the proposed state results in an ABORT instruction from the Prefect API, an
error will be raised.

**Args:**
- `state`: a new state for the flow run
- `flow_run_id`: an optional flow run id, used when proposing flow run states

**Returns:**
- a State model representation of the flow run state

**Raises:**
- `ValueError`: if flow_run_id is not provided
- `prefect.exceptions.Abort`: if an ABORT instruction is received from
the Prefect API


### `get_state_for_result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L507" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
get_state_for_result(obj: Any) -> Optional[tuple[State, RunType]]
```


Get the state related to a result object.

`link_state_to_result` must have been called first.


### `link_state_to_flow_run_result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L518" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
link_state_to_flow_run_result(state: State, result: Any) -> None
```


Creates a link between a state and flow run result


### `link_state_to_task_run_result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L523" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
link_state_to_task_run_result(state: State, result: Any) -> None
```


Creates a link between a state and task run result


### `link_state_to_result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L528" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
link_state_to_result(state: State, result: Any, run_type: RunType) -> None
```


Caches a link between a state and a result and its components using
the `id` of the components to map to the state. The cache is persisted to the
current flow run context since task relationships are limited to within a flow run.

This allows dependency tracking to occur when results are passed around.
Note: Because `id` is used, we cannot cache links between singleton objects.

We only cache the relationship between components 1-layer deep.
Example:
    Given the result [1, ["a","b"], ("c",)], the following elements will be
    mapped to the state:
    - [1, ["a","b"], ("c",)]
    - ["a","b"]
    - ("c",)

    Note: the int `1` will not be mapped to the state because it is a singleton.

Other Notes:
We do not hash the result because:
- If changes are made to the object in the flow between task calls, we can still
  track that they are related.
- Hashing can be expensive.
- Not all objects are hashable.

We do not set an attribute, e.g. `__prefect_state__`, on the result because:

- Mutating user's objects is dangerous.
- Unrelated equality comparisons can break unexpectedly.
- The field can be preserved on copy.
- We cannot set this attribute on Python built-ins.


### `should_log_prints` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L589" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
should_log_prints(flow_or_task: Union['Flow[..., Any]', 'Task[..., Any]']) -> bool
```

### `check_api_reachable` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L601" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
check_api_reachable(client: 'PrefectClient', fail_message: str) -> None
```

### `emit_task_run_state_change_event` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L619" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
emit_task_run_state_change_event(task_run: TaskRun, initial_state: Optional[State[Any]], validated_state: State[Any], follows: Optional[Event] = None) -> Optional[Event]
```

### `resolve_to_final_result` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L717" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
resolve_to_final_result(expr: Any, context: dict[str, Any]) -> Any
```


Resolve any `PrefectFuture`, or `State` types nested in parameters into
data. Designed to be use with `visit_collection`.


### `resolve_inputs_sync` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/utilities/engine.py#L788" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
resolve_inputs_sync(parameters: dict[str, Any], return_data: bool = True, max_depth: int = -1) -> dict[str, Any]
```


Resolve any `Quote`, `PrefectFuture`, or `State` types nested in parameters into
data.

**Returns:**
- A copy of the parameters with resolved data

**Raises:**
- `UpstreamTaskError`: If any of the upstream states are not `COMPLETED`

